package org.com.gr.capserver.config.mqtt.receive.app;

import lombok.extern.slf4j.Slf4j;
import org.com.gr.capserver.config.mqtt.factory.MqttMessageHandlerStrategy;
import org.com.gr.capserver.model.Cap;
import org.com.gr.capserver.model.Controller;
import org.com.gr.capserver.service.PTZHandlerService;
import org.com.gr.capserver.utils.MongoHelper;
import org.com.gr.capserver.utils.constant.BaseConstant;
import org.com.gr.capserver.utils.hikvisionSDK.SDKBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.Message;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

import javax.management.Query;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.locks.ReentrantLock;

/**
 * ClassName: CapsMessageHandlerStrategy
 * Description:
 *
 * @author binbin_hao
 * @date 2023/9/8 15:00
 */
@Component
@Slf4j
public class CapsMessageHandlerStrategy implements MqttMessageHandlerStrategy {

    @Autowired
    PTZHandlerService ptzHandlerService;
    @Autowired
    ThreadPoolTaskExecutor customAsyncThreadPool;
    @Override
    public String getTopic() {
        return BaseConstant.CAP_PTZ_TOPIC;
    }

    private final ConcurrentHashMap<String, Queue<Message>> messageMa=new ConcurrentHashMap<>();

    @Override
    public void handle(Message message) {
        try {
            String[] topics=message.getHeaders().get(MqttHeaders.TOPIC).toString().split("/");
            //摄像头序列号
            String deviceNum=topics[2];
            Queue<Message> queue =  messageMa.computeIfAbsent(deviceNum,k->new ConcurrentLinkedQueue<>());
            //指令
            queue.add(message);
            customAsyncThreadPool.submit(()->{
                synchronized (queue){
                    log.info("当前线程:{}",Thread.currentThread().getId());
                    while (!queue.isEmpty()){
                        Message message1=queue.poll();
                        String[] topics1=message1.getHeaders().get(MqttHeaders.TOPIC).toString().split("/");
                        //摄像头序列号
                        String deviceNum1=topics1[2];
                        String command1=topics1[3];
                        Controller controller=new Controller();
                        controller.setCommand(Integer.parseInt(command1));
                        controller.setKey(deviceNum1);
                        ptzHandlerService.controllerstart(controller);
                    }
                }
            });
        } catch (NumberFormatException e) {
            log.error("控制摄像头异常，异常信息：{}",e.getMessage());
            e.printStackTrace();
        }

    }
}
